{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"images/dask_horizontal.svg\" align=\"right\" width=\"30%\">"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Distributed, Advanced"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Distributed futures"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import Client\n",
    "c = Client(n_workers=4)\n",
    "c.cluster"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In the previous chapter, we showed that executing a calculation (created using delayed) with the distributed executor is identical to any other executor. However, we now have access to additional functionality, and control over what data is held in memory.\n",
    "\n",
    "To begin, the `futures` interface (derived from the built-in `concurrent.futures`) allows map-reduce like functionality. We can submit individual functions for evaluation with one set of inputs, or evaluated over a sequence of inputs with `submit()` and `map()`. Notice that the call returns immediately, giving one or more *futures*, whose status begins as \"pending\" and later becomes \"finished\". There is no blocking of the local Python session."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here is the simplest example of `submit` in action:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def inc(x):\n",
    "    return x + 1\n",
    "\n",
    "fut = c.submit(inc, 1)\n",
    "fut"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can re-execute the following cell as often as we want as a way to poll the status of the future. This could of course be done in a loop, pausing for a short time on each iteration. We could continue with our work, or view a progressbar of work still going on, or force a wait until the future is ready. \n",
    "\n",
    "In the meantime, the `status` dashboard (link above next to the Cluster widget) has gained a new element in the task stream, indicating that `inc()` has completed, and the progress section at the problem shows one task complete and held in memory."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "fut"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Possible alternatives you could investigate:\n",
    "```python\n",
    "from dask.distributed import wait, progress\n",
    "progress(fut)\n",
    "```\n",
    "would show a progress bar in *this* notebook, rather than having to go to the dashboard. This progress bar is also asynchronous, and doesn't block the execution of other code in the meanwhile.\n",
    "\n",
    "```python\n",
    "wait(fut)\n",
    "```\n",
    "would block and force the notebook to wait until the computation pointed to by `fut` was done. However, note that the result of `inc()` is sitting in the cluster, it would take **no time** to execute the computation now, because Dask notices that we are asking for the result of a computation it already knows about. More on this later."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# grab the information back - this blocks if fut is not ready\n",
    "c.gather(fut)\n",
    "# equivalent action when only considering a single future\n",
    "# fut.result()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here we see an alternative way to execute work on the cluster: when you submit or map with the inputs as futures, the *computation moves to the data* rather than the other way around, and the client, in the local Python session, need never see the intermediate values. This is similar to building the graph using delayed, and indeed, delayed can be used in conjunction with futures. Here we use the delayed object `total` from before."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Some trivial work that takes time\n",
    "# repeated from the Distributed chapter.\n",
    "\n",
    "from dask import delayed\n",
    "import time\n",
    "\n",
    "def inc(x):\n",
    "    time.sleep(5)\n",
    "    return x + 1\n",
    "\n",
    "def dec(x):\n",
    "    time.sleep(3)\n",
    "    return x - 1\n",
    "\n",
    "def add(x, y):\n",
    "    time.sleep(7)\n",
    "    return x + y\n",
    "\n",
    "x = delayed(inc)(1)\n",
    "y = delayed(dec)(2)\n",
    "total = delayed(add)(x, y)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# notice the difference from total.compute()\n",
    "# notice that this cell completes immediately\n",
    "fut = c.compute(total)\n",
    "fut"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "c.gather(fut) # waits until result is ready"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### `Client.submit`\n",
    "\n",
    "`submit` takes a function and arguments, pushes these to the cluster, returning a *Future* representing the result to be computed. The function is passed to a worker process for evaluation. Note that this cell returns immediately, while computation may still be ongoing on the cluster."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "fut = c.submit(inc, 1)\n",
    "fut"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This looks a lot like doing `compute()`, above, except now we are passing the function and arguments directly to the cluster. To anyone used to `concurrent.futures`, this will look familiar. This new `fut` behaves the same way as the one above. Note that we have now over-written the previous definition of `fut`, which will get garbage-collected, and, as a result, that previous result is released by the cluster\n",
    "\n",
    "### Exercise: Rebuild the above delayed computation using `Client.submit` instead\n",
    "\n",
    "The arguments passed to `submit` can be futures from other submit operations or delayed objects. The former, in particular, demonstrated the concept of *moving the computation to the data* which is one of the most powerful elements of programming with Dask.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Your code here"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "source_hidden": true
    }
   },
   "outputs": [],
   "source": [
    "x = c.submit(inc, 1)\n",
    "y = c.submit(dec, 2)\n",
    "total = c.submit(add, x, y)\n",
    "\n",
    "print(total)     # This is still a future\n",
    "c.gather(total)   # This blocks until the computation has finished\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Each futures represents a result held, or being evaluated by the cluster. Thus we can control caching of intermediate values - when a future is no longer referenced, its value is forgotten. In the solution, above, futures are held for each of the function calls. These results would not need to be re-evaluated if we chose to submit more work that needed them.\n",
    "\n",
    "We can explicitly pass data from our local session into the cluster using `scatter()`, but usually better is to construct functions that do the loading of data within the workers themselves, so that there is no need to serialise and communicate the data. Most of the loading functions within Dask, sudh as `dd.read_csv`, work this way. Similarly, we normally don't want to `gather()` results that are too big in memory.\n",
    "\n",
    "The [full API](http://distributed.readthedocs.io/en/latest/api.html) of the distributed scheduler gives details of interacting with the cluster, which remember, can be on your local machine or possibly on a massive computational resource. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The futures API offers a work submission style that can easily emulate the map/reduce paradigm (see `c.map()`) that may be familiar to many people. The intermediate results, represented by futures, can be passed to new tasks without having to bring the pull locally from the cluster, and new work can be assigned to work on the output of previous jobs that haven't even begun yet.\n",
    "\n",
    "Generally, any Dask operation that is executed using `.compute()` can be submitted for asynchronous execution using `c.compute()` instead, and this applies to all collections. Here is an example with the calculation previously seen in the Bag chapter. We have replaced the `.compute()` method there with the distributed client version, so, again, we could continue to submit more work (perhaps based on the result of the calculation), or, in the next cell, follow the progress of the computation. A similar progress-bar appears in the monitoring UI page."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%run prep.py -d accounts"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dask.bag as db\n",
    "import os\n",
    "import json\n",
    "filename = os.path.join('data', 'accounts.*.json.gz')\n",
    "lines = db.read_text(filename)\n",
    "js = lines.map(json.loads)\n",
    "\n",
    "f = c.compute(js.filter(lambda record: record['name'] == 'Alice')\n",
    "       .pluck('transactions')\n",
    "       .flatten()\n",
    "       .pluck('amount')\n",
    "       .mean())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import progress\n",
    "# note that progress must be the last line of a cell\n",
    "# in order to show up\n",
    "progress(f)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# get result.\n",
    "c.gather(f)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# release values by deleting the futures\n",
    "del f, fut, x, y, total"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Persist"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Considering which data should be loaded by the workers, as opposed to passed, and which intermediate values to persist in worker memory, will in many cases determine the computation efficiency of a process.\n",
    "\n",
    "In the example here, we repeat a calculation from the Array chapter - notice that each call to `compute()` is roughly the same speed, because the loading of the data is included every time."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%run prep.py -d random"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import h5py\n",
    "import os\n",
    "f = h5py.File(os.path.join('data', 'random.hdf5'), mode='r')\n",
    "dset = f['/x']\n",
    "import dask.array as da\n",
    "x = da.from_array(dset, chunks=(1000000,))\n",
    "\n",
    "%time x.sum().compute()\n",
    "%time x.sum().compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If, instead, we persist the data to RAM up front (this takes a few seconds to complete - we could `wait()` on this process), then further computations will be much faster."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# changes x from a set of delayed prescriptions\n",
    "# to a set of futures pointing to data in RAM\n",
    "# See this on the UI dashboard.\n",
    "x = c.persist(x)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%time x.sum().compute()\n",
    "%time x.sum().compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Naturally, persisting every intermediate along the way is a bad idea, because this will tend to fill up all available RAM and make the whole system slow (or break!). The ideal persist point is often at the end of a set of data cleaning steps, when the data is in a form which will get queried often. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Exercise**: how is the memory associated with `x` released, once we know we are done with it?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Asynchronous computation\n",
    "<img style=\"float: right;\" src=\"https://upload.wikimedia.org/wikipedia/commons/thumb/3/32/Rosenbrock_function.svg/450px-Rosenbrock_function.svg.png\" height=200 width=200>\n",
    "\n",
    "One benefit of using the futures API is that you can have dynamic computations that adjust as things progress. Here we implement a simple naive search by looping through results as they come in, and submit new points to compute as others are still running.\n",
    "\n",
    "Watching the [diagnostics dashboard](../../9002/status) as this runs you can see computations are being concurrently run while more are being submitted. This flexibility can be useful for parallel algorithms that require some level of synchronization.\n",
    "\n",
    "Lets perform a very simple minimization using dynamic programming. The function of interest is known as Rosenbrock:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# a simple function with interesting minima\n",
    "import time\n",
    "\n",
    "def rosenbrock(point):\n",
    "    \"\"\"Compute the rosenbrock function and return the point and result\"\"\"\n",
    "    time.sleep(0.1)\n",
    "    score = (1 - point[0])**2 + 2 * (point[1] - point[0]**2)**2\n",
    "    return point, score"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Initial setup, including creating a graphical figure. We use Bokeh for this, which allows for dynamic update of the figure as results come in. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from bokeh.io import output_notebook, push_notebook\n",
    "from bokeh.models.sources import ColumnDataSource\n",
    "from bokeh.plotting import figure, show\n",
    "import numpy as np\n",
    "output_notebook()\n",
    "\n",
    "# set up plot background\n",
    "N = 500\n",
    "x = np.linspace(-5, 5, N)\n",
    "y = np.linspace(-5, 5, N)\n",
    "xx, yy = np.meshgrid(x, y)\n",
    "d = (1 - xx)**2 + 2 * (yy - xx**2)**2\n",
    "d = np.log(d)\n",
    "\n",
    "p = figure(x_range=(-5, 5), y_range=(-5, 5))\n",
    "p.image(image=[d], x=-5, y=-5, dw=10, dh=10, palette=\"Spectral11\");"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We start off with a point at (0, 0), and randomly scatter test points around it. Each evaluation takes ~100ms, and as result come in, we test to see if we have a new best point, and choose random points around that new best point, as the search box shrinks.\n",
    "\n",
    "We print the function value and current best location each time we have a new best value."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import as_completed\n",
    "from random import uniform\n",
    "\n",
    "scale = 5                  # Intial random perturbation scale\n",
    "best_point = (0, 0)        # Initial guess\n",
    "best_score = float('inf')  # Best score so far\n",
    "startx = [uniform(-scale, scale) for _ in range(10)]\n",
    "starty = [uniform(-scale, scale) for _ in range(10)]\n",
    "\n",
    "# set up plot\n",
    "source = ColumnDataSource({'x': startx, 'y': starty, 'c': ['grey'] * 10})\n",
    "p.circle(source=source, x='x', y='y', color='c')\n",
    "t = show(p, notebook_handle=True)\n",
    "\n",
    "# initial 10 random points\n",
    "futures = [c.submit(rosenbrock, (x, y)) for x, y in zip(startx, starty)]\n",
    "iterator = as_completed(futures)\n",
    "\n",
    "for res in iterator:\n",
    "    # take a completed point, is it an improvement?\n",
    "    point, score = res.result()\n",
    "    if score < best_score:\n",
    "        best_score, best_point = score, point\n",
    "        print(score, point)\n",
    "\n",
    "    x, y = best_point\n",
    "    newx, newy = (x + uniform(-scale, scale), y + uniform(-scale, scale))\n",
    "    \n",
    "    # update plot\n",
    "    source.stream({'x': [newx], 'y': [newy], 'c': ['grey']}, rollover=20)\n",
    "    push_notebook(document=t)\n",
    "    \n",
    "    # add new point, dynamically, to work on the cluster\n",
    "    new_point = c.submit(rosenbrock, (newx, newy))\n",
    "    iterator.add(new_point)  # Start tracking new task as well\n",
    "\n",
    "    # Narrow search and consider stopping\n",
    "    scale *= 0.99\n",
    "    if scale < 0.001:\n",
    "        break\n",
    "point"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Debugging"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "When something goes wrong in a distributed job, it is hard to figure out what the problem was and what to do about it. When a task raises an exception, the exception will show up when that result, or other result that depend upon it, is gathered.\n",
    "\n",
    "Consider the following delayed calculation to be computed by the cluster. As usual, we get back a future, which the cluster is working on to compute (this happens very slowly for the trivial procedure)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@delayed\n",
    "def ratio(a, b):\n",
    "    return a // b\n",
    "\n",
    "ina = [5, 25, 30]\n",
    "inb = [5, 5, 6]\n",
    "out = delayed(sum)([ratio(a, b) for (a, b) in zip(ina, inb)])\n",
    "f = c.compute(out)\n",
    "f"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We only get to know what happened when we gather the result (this is also true for `out.compute()`, except we could not have done other stuff in the meantime). For the first set of inputs, it works fine."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "c.gather(f)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "But if we introduce bad input, an exception is raised. The exception happens in `ratio`, but only comes to our attention when calculating the sum."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": [
     "raises-exception"
    ]
   },
   "outputs": [],
   "source": [
    "ina = [5, 25, 30]\n",
    "inb = [5, 0, 6]\n",
    "out = delayed(sum)([ratio(a, b) for (a, b) in zip(ina, inb)])\n",
    "f = c.compute(out)\n",
    "c.gather(f)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The display in this case makes the origin of the exception obvious, but this is not always the case. How should this be debugged, how would we go about finding out the exact conditions that caused the exception? \n",
    "\n",
    "The first step, of course, is to write well-tested code which makes appropriate assertions about its input and clear warnings and error messages when something goes wrong. This applies to all code.\n",
    "\n",
    "The most typical thing to do is to execute some portion of the computation in the local thread, so that we can run the Python debugger and query the state of things at the time that the exception happened. Obviously, this cannot be performed on the whole data-set when dealing with Big Data on a cluster, but a suitable sample will probably do even then."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": [
     "raises-exception"
    ]
   },
   "outputs": [],
   "source": [
    "import dask\n",
    "with dask.config.set(scheduler=\"sync\"):\n",
    "    # do NOT use c.compute(out) here - we specifically do not\n",
    "    # want the distributed scheduler\n",
    "    out.compute()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# uncomment to enter post-mortem debugger\n",
    "# %debug"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The trouble with this approach is that Dask is meant for the execution of large datasets/computations - you probably can't simply run the whole thing \n",
    "in one local thread, else you wouldn't have used Dask in the first place. So the code above should only be used on a small part of the data that also exihibits the error. \n",
    "Furthermore, the method will not work when you are dealing with futures (such as `f`, above, or after persisting) instead of delayed-based computations.\n",
    "\n",
    "As an alternative, you can ask the scheduler to analyze your calculation and find the specific sub-task responsible for the error, and pull only it and its dependnecies locally for execution."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": [
     "raises-exception"
    ]
   },
   "outputs": [],
   "source": [
    "c.recreate_error_locally(f)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# uncomment to enter post-mortem debugger\n",
    "# %debug"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finally, there are errors other than exceptions, when we need to look at the state of the scheduler/workers. In the standard \"LocalCluster\" we started, we\n",
    "have direct access to these."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "[(k, v.state) for k, v in c.cluster.scheduler.tasks.items() if v.exception is not None]"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
